package app.sublive.tira.im.lib.processor;

import app.sublive.tira.base.logger.Logger;
import app.sublive.tira.im.lib.Client;
import app.sublive.tira.im.lib.entity.Message;
import app.sublive.tira.im.lib.exception.ConnectionBrokenException;
import app.sublive.tira.im.lib.exception.PacketBrokenException;
import com.google.protobuf.MessageLite;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;

/* loaded from: classes4.dex */
public abstract class AbstractSignalSynchronizer implements IMessageSynchronizer {
    private Client client;
    private Map<Integer, Class> cmdTypes = new ConcurrentHashMap();
    private DownstreamSignalThreadProcessor downstreamMsgThread;
    private final BlockingQueue<Message<?>> downstreamQueue;
    protected boolean running;
    private UpstreamMessageThreadProcessor upstreamMsgThread;
    private final BlockingQueue<Message> upstreamQueue;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: classes4.dex */
    public final class DownstreamSignalThreadProcessor extends Thread {
        public boolean processing;

        private DownstreamSignalThreadProcessor() {
            super("signal-reader");
            this.processing = true;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void close() {
            this.processing = false;
            try {
                interrupt();
            } catch (Exception unused) {
            }
        }

        public boolean isRunning() {
            return !this.processing;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (this.processing) {
                try {
                    AbstractSignalSynchronizer abstractSignalSynchronizer = AbstractSignalSynchronizer.this;
                    if (!abstractSignalSynchronizer.running) {
                        break;
                    }
                    try {
                        AbstractSignalSynchronizer.this.onRecvSignalMessage((Message) abstractSignalSynchronizer.downstreamQueue.take());
                        AbstractSignalSynchronizer.this.sync();
                    } catch (InterruptedException unused) {
                        AbstractSignalSynchronizer.this.running = false;
                        this.processing = false;
                    } catch (Exception e) {
                        e.printStackTrace();
                        AbstractSignalSynchronizer.this.onRecvSignalError(e);
                    }
                } finally {
                    AbstractSignalSynchronizer.this.downstreamQueue.clear();
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: classes4.dex */
    public final class UpstreamMessageThreadProcessor extends Thread {
        public boolean processing;

        private UpstreamMessageThreadProcessor() {
            super("signal-synchronizer-writter");
            this.processing = true;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void close() {
            this.processing = false;
            try {
                interrupt();
            } catch (Exception unused) {
            }
        }

        public boolean isRunning() {
            return !this.processing;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            Message message;
            AbstractSignalSynchronizer abstractSignalSynchronizer;
            PacketBrokenException packetBrokenException;
            while (this.processing) {
                try {
                    AbstractSignalSynchronizer abstractSignalSynchronizer2 = AbstractSignalSynchronizer.this;
                    if (!abstractSignalSynchronizer2.running) {
                        break;
                    }
                    try {
                        try {
                            message = (Message) abstractSignalSynchronizer2.upstreamQueue.take();
                        } catch (InterruptedException unused) {
                            AbstractSignalSynchronizer.this.running = false;
                            this.processing = false;
                        }
                    } catch (ConnectionBrokenException e) {
                        AbstractSignalSynchronizer.this.client.notifyEventError(new ConnectionBrokenException(e.getMessage()));
                        AbstractSignalSynchronizer.this.client.disconnect(2, 1);
                    } catch (Exception e2) {
                        e2.printStackTrace();
                        AbstractSignalSynchronizer.this.onSendMessageError(e2);
                    }
                    if (message.getId() <= 0) {
                        abstractSignalSynchronizer = AbstractSignalSynchronizer.this;
                        packetBrokenException = new PacketBrokenException();
                    } else if (message.getCmd() <= 0) {
                        abstractSignalSynchronizer = AbstractSignalSynchronizer.this;
                        packetBrokenException = new PacketBrokenException();
                    } else if (AbstractSignalSynchronizer.this.client.checkProtocolVersion(message.getProtocolVersion())) {
                        AbstractSignalSynchronizer.this.client.sendMessage(message);
                        AbstractSignalSynchronizer.this.onSendMessageSuccess(message);
                    } else {
                        abstractSignalSynchronizer = AbstractSignalSynchronizer.this;
                        packetBrokenException = new PacketBrokenException();
                    }
                    abstractSignalSynchronizer.onSendMessageError(message, packetBrokenException);
                } finally {
                    AbstractSignalSynchronizer.this.upstreamQueue.clear();
                }
            }
        }
    }

    public AbstractSignalSynchronizer(int i) {
        this.downstreamQueue = new ArrayBlockingQueue(i);
        this.upstreamQueue = new ArrayBlockingQueue(i);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void onSendMessageError(Message message, Throwable th) {
        Logger.e(th);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void onSendMessageError(Throwable th) {
        Logger.e(th);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void onSendMessageSuccess(Message message) {
        Logger.i("send message:" + message.toString() + " success");
    }

    private void release() {
        this.running = false;
        DownstreamSignalThreadProcessor downstreamSignalThreadProcessor = this.downstreamMsgThread;
        if (downstreamSignalThreadProcessor != null) {
            downstreamSignalThreadProcessor.close();
        }
        UpstreamMessageThreadProcessor upstreamMessageThreadProcessor = this.upstreamMsgThread;
        if (upstreamMessageThreadProcessor != null) {
            upstreamMessageThreadProcessor.close();
        }
    }

    @Override // app.sublive.tira.im.lib.processor.IProcessor
    public synchronized void close() {
        release();
    }

    @Override // app.sublive.tira.im.lib.processor.IProcessor
    public Client getClient() {
        return this.client;
    }

    @Override // app.sublive.tira.im.lib.processor.ICommonReader
    public List<Integer> getCmdTypes() {
        ArrayList arrayList = new ArrayList();
        for (Object obj : this.cmdTypes.keySet().toArray()) {
            arrayList.add((Integer) obj);
        }
        return arrayList;
    }

    @Override // app.sublive.tira.im.lib.processor.IMessageSynchronizer
    public Class getMessageType(Integer num) {
        return this.cmdTypes.get(num);
    }

    public abstract Message getSyncMessage();

    public synchronized boolean isRunning() {
        return this.running;
    }

    public abstract void onRecvSignalError(Throwable th);

    public abstract void onRecvSignalMessage(Message message);

    @Override // app.sublive.tira.im.lib.processor.IMessageSynchronizer
    public void putMessage(Integer num, Message message) {
        this.downstreamQueue.offer(message);
    }

    public void sendMessage(Message message) {
        this.upstreamQueue.offer(message);
    }

    @Override // app.sublive.tira.im.lib.processor.IProcessor
    public void setClient(Client client) {
        this.client = client;
    }

    public void setCommandTypes(Map<Integer, Class<? extends MessageLite>> map) {
        this.cmdTypes.putAll(map);
    }

    @Override // app.sublive.tira.im.lib.processor.IProcessor
    public synchronized void shutdown() {
        try {
            this.upstreamQueue.clear();
        } catch (Exception unused) {
        }
        release();
    }

    @Override // app.sublive.tira.im.lib.processor.IProcessor
    public synchronized void startup() {
        if (this.running) {
            release();
        }
        this.running = true;
        this.downstreamMsgThread = new DownstreamSignalThreadProcessor();
        this.upstreamMsgThread = new UpstreamMessageThreadProcessor();
        this.downstreamMsgThread.start();
        this.upstreamMsgThread.start();
    }

    public synchronized void sync() {
        Logger.d("start signal local sync");
        sendMessage(getSyncMessage());
    }
}
